package collectors

import (
	"context"
	"crypto/md5"
	"errors"
	"fmt"
	"github.com/go-kit/kit/log"
	"io"
	"os"
	"path/filepath"
	"strings"
)

type CollectorService interface {
	ReportMessage(ctx context.Context, report MessageReport) error
	ReportUpload(ctx context.Context, report UploadReport) error
	ReportCollectorJoinChatroom(ctx context.Context, report CollectorJoinChatroomReport) error
	ReportCollectorLeaveChatroom(ctx context.Context, report CollectorLeaveChatroomReport) error
	ReportMemberJoinChatroom(ctx context.Context, report MemberJoinChatroomReport) error
	ReportMemberLeaveChatroom(ctx context.Context, report MemberLeaveChatroomReport) error
	ReportChatroomMember(ctx context.Context, report ChatroomMemberReport) error
	ReportLog(ctx context.Context, report LogReport) error
	ReportHeartbeat(ctx context.Context, report HeartBeatReport) error
}

type collectorService struct {
	logger    log.Logger
	repo      CollectorReporterRepository
	uploadDir string
}

func NewCollectorService(logger log.Logger, repo CollectorReporterRepository, uploadDir string) CollectorService {
	return &collectorService{
		logger:    logger,
		repo:      repo,
		uploadDir: uploadDir,
	}
}

func (svc *collectorService) ReportMessage(ctx context.Context, report MessageReport) error {
	if report.Type == "text" {
		return svc.repo.ReportTextMessage(ctx, report)
	}
	return svc.repo.ReportOtherMessage(ctx, report)
}

func (svc *collectorService) ReportUpload(ctx context.Context, report UploadReport) error {
	defer func() {
		_ = report.File.Close()
	}()
	if len(report.MessageReport.FileMd5) != 32 {
		return errors.New("md5格式错误")
	}
	filename := strings.ToUpper(fmt.Sprintf("%s%016X", report.MessageReport.FileMd5, report.MessageReport.FileSize))
	report.MessageReport.FilePath = filename
	filePath := filepath.Join(svc.uploadDir, filename)
	_, err := os.Stat(filePath)
	if err != nil {
		tmpFilePath := filepath.Join(svc.uploadDir, "TMP"+filename)
		tmpWriter, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
		if err != nil {
			return err
		}
		defer func() {
			if err := tmpWriter.Close(); err != nil {
				_ = svc.logger.Log("err", err.Error())
			}
		}()
		defer func() {
			_, err := os.Stat(tmpFilePath)
			if err == nil {
				if err := os.Remove(tmpFilePath); err != nil {
					_ = svc.logger.Log("err", err.Error())
				}
			}
		}()
		hasher := md5.New()
		buf := make([]byte, 4096)
		for {
			n, err := report.File.Read(buf)
			if err != nil {
				if err == io.EOF {
					break
				} else {
					return err
				}
			}
			if _, err := hasher.Write(buf[:n]); err != nil {
				return err
			}
			if _, err := tmpWriter.Write(buf[:n]); err != nil {
				return err
			}
		}
		fileMd5 := fmt.Sprintf("%X", hasher.Sum(nil))
		gotMd5 := strings.ToUpper(report.MessageReport.FileMd5)
		if fileMd5 != gotMd5 {
			_ = svc.logger.Log("err", "md5不匹配", "expected", fileMd5, "got", gotMd5)
			return fmt.Errorf("md5不匹配: expected: %q, got: %q", fileMd5, gotMd5)
		}
		if err := os.Rename(tmpFilePath, filePath); err != nil {
			return err
		}
	}
	return svc.repo.ReportOtherMessage(ctx, report.MessageReport)
}

func (svc *collectorService) ReportCollectorJoinChatroom(ctx context.Context, report CollectorJoinChatroomReport) error {
	return svc.repo.ReportCollectorJoinChatroom(ctx, report)
}

func (svc *collectorService) ReportCollectorLeaveChatroom(ctx context.Context, report CollectorLeaveChatroomReport) error {
	return svc.repo.ReportCollectorLeaveChatroom(ctx, report)
}

func (svc *collectorService) ReportMemberJoinChatroom(ctx context.Context, report MemberJoinChatroomReport) error {
	return svc.repo.ReportMemberJoinChatroom(ctx, report)
}

func (svc *collectorService) ReportMemberLeaveChatroom(ctx context.Context, report MemberLeaveChatroomReport) error {
	return svc.repo.ReportMemberLeaveChatroom(ctx, report)
}

func (svc *collectorService) ReportChatroomMember(ctx context.Context, report ChatroomMemberReport) error {
	return svc.repo.ReportChatroomMember(ctx, report)
}

func (svc *collectorService) ReportLog(ctx context.Context, report LogReport) error {
	return svc.repo.ReportLog(ctx, report)
}

func (svc *collectorService) ReportHeartbeat(ctx context.Context, report HeartBeatReport) error {
	return svc.repo.ReportHeartbeat(ctx, report)
}
